package spring.batch.config;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import spring.batch.entity.SysUser;


/**
 * cvs 批处理 配置类
 *
 * @author blues
 */
@Configuration
@EnableBatchProcessing
public class TriggerBatchConfig {

    @Bean
    @StepScope
    public FlatFileItemReader<SysUser> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception {
        FlatFileItemReader<SysUser> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource(pathToFile));
        reader.setLineMapper(new DefaultLineMapper<SysUser>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "id", "name"});
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<SysUser>() {{
                setTargetType(SysUser.class);
            }});
        }});
        return reader;
    }



    @Bean
    public ItemProcessor<SysUser, SysUser> processor() {
        // 使用自定义的ItemProcessor的实现CsvItemProcessor
        CsvItemProcessor processor = new CsvItemProcessor();
        //指定校验器
        processor.setValidator(csvBeanValidator());
        return processor;
    }


    /**
     * dataSource 在配置文件中已定义
     * @param dataSource
     * @return
     */
    @Bean
    public ItemWriter<SysUser> writer(DataSource dataSource) {
        // 使用JDBC的批处理JdbcBatchItemWriter来写数据到数据库
        JdbcBatchItemWriter<SysUser> writer = new JdbcBatchItemWriter<>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<SysUser>());
        String sql = "INSERT INTO SYS_USER " + "(ID,NAME) values(:id,:name)";
        //设置要执行的批处理SQL语句
        writer.setSql(sql);
        writer.setDataSource(dataSource);
        return writer;
    }


    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        return jobRepositoryFactoryBean.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
        return jobLauncher;
    }

    @Bean
    public Job importJob(JobBuilderFactory jobBuilderFactory, Step step) {
        return jobBuilderFactory.get("importJob")
                .incrementer(new RunIdIncrementer())
                //指定step
                .flow(step)
                .end()
                //绑定监听器
                .listener(csvJobListener())
                .build();
    }

    @Bean
    public Step step(StepBuilderFactory stepBuilderFactory, ItemReader<SysUser> reader, ItemWriter<SysUser> writer, ItemProcessor<SysUser,SysUser> processor) {
        return stepBuilderFactory.get("step")
                //定义每次提交的数据数量
                .<SysUser, SysUser>chunk(65000)
                //step绑定reader
                .reader(reader)
                //step绑定processor
                .processor(processor)
                //step绑定writer
                .writer(writer)
                .build();
    }



    @Bean
    public CsvJobListener csvJobListener() {
        return new CsvJobListener();
    }

    @Bean
    public Validator<SysUser> csvBeanValidator() {
        return new CsvBeanValidator<SysUser>();
    }


}
